Deduplicate shared pool code into reusable helpers#51
Draft
shsms wants to merge 12 commits into
Draft
Conversation
`PvPoolBoundsTracker` and `BatteryPoolBoundsTracker` carried byte-for-byte identical `run` loops — receive a snapshot, compute its bounds, broadcast them, and handle lag/closure — differing only in the per-pool aggregation and the log label. Replace both with a single generic `PoolBoundsTracker<S, Q>` that takes the aggregation as a plain `fn(&S) -> Vec<Bounds<Q>>` and a label string. The pool-specific aggregation stays in `pv_bounds_tracker` and `battery_bounds_tracker` as `compute_pool_bounds` free functions (with their existing tests), now passed into the shared tracker. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`telem_with_power_bounds`, `handles`, and the `last_snapshot` broadcast drainer were copy-pasted verbatim across the pool and bounds-tracker test modules. Collect them into a shared `#[cfg(test)] test_support` module — `last_snapshot` generic over the snapshot type — and have the test modules (and `new_pool`) use them. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The run loop logged "Failed to send component status" on every sample and every missing-data tick once the pool tracker dropped its mpsc receiver, but never exited, leaking the task and its broadcast subscription for the life of the process (scaling with each pool recreation). Break the loop on send failure, matching the RecvError::Closed arm, so the tracker shuts down when there is nothing left to report to. Fixes frequenz-floss#43. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`RecvError::Closed` means the upstream telemetry tracker dropped its sender, which happens on a normal teardown of the pool — the bounds tracker has nothing left to aggregate. Log it at debug rather than error, matching the no-receivers path just above. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The tick arm skips `send()` whenever the partitioning is unchanged, but a failed `send()` is the only thing that tells the tracker every receiver has gone. With a stable partition (e.g. all components silent, each re-emitting the same unhealthy status) the snapshot never changes, so the skip path is taken every tick and the dropped receivers are never noticed — the tracker and its component trackers leak until process exit. Check `receiver_count()` each tick, before the unchanged-skip, and break when it reaches zero. Applies to both the PV and battery pool telemetry trackers. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Both pool telemetry trackers now treat every exit from `run` as a normal shutdown: the loop ends when every component tracker has gone or every receiver has dropped, neither of which is an error. The trailing `error!` + `Err` return became a `debug!`, and the in-loop send-failure no longer logs at error. With shutdown no longer fallible, `run` has nothing left to report, so drop its `Result` return (the call sites only `tokio::spawn` it and discard the result). The remaining startup failures -- an empty component set and opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The group tracker logged at error and returned `Err` on three paths that are all ordinary shutdown: its inverter or battery component trackers all exiting (`recv` yields `None`), and the pool tracker dropping its receiver (the send to it fails). The last one fired on every `BatteryPool` teardown, so each group tracker spammed "Failed to send inverter-battery group status" at error level. Log these at debug and just return, matching how the pool telemetry trackers already treat their own shutdown. With shutdown no longer an error, `run` has nothing left to fail with, so drop its `Result` return; the remaining startup failures -- opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The per-component stream task exits once it sees no receivers and reports `StreamStatus::Ended`, but the actor only removed the component from the retry map — the now-dead `broadcast::Sender` stayed cached in `component_streams`. Any later subscription for that component was handed `tx.subscribe()` on that dead sender without a new tonic stream being started, so it never received telemetry. With the telemetry trackers now exiting cleanly instead of leaking (which used to hold receivers forever and mask this), every drop-pool/rebuild cycle hit it: the rebuilt pool saw its components as permanently silent. On `Ended`, drop the cached sender so the next subscription starts a fresh stream. If a subscriber arrived in the window between the stream task's no-receivers check and the actor processing `Ended`, the entry still has receivers — restart the stream for them instead of evicting it, since the old task is already gone either way. Fixes frequenz-floss#42. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`PvPoolSnapshot` and `InverterBatteryGroupStatus` each carried their own pair(s)
of `healthy`/`unhealthy` telemetry maps, and both telemetry trackers
open-coded the same insert-here / remove-there bookkeeping on every status
update.
Introduce `ComponentHealthPartition` — a `{ healthy, unhealthy }` pair with
`mark_healthy`/`mark_unhealthy` helpers — and rebuild both snapshot types on it
(`PvPoolSnapshot { inverters }`, `InverterBatteryGroupStatus { inverters,
batteries }`). The trackers now mutate partitions through the shared helpers,
and the PV tracker's unchanged-check compares whole partitions rather than
field by field, so a future field can't silently escape change detection.
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`BatteryPool::try_new` and `PvPool::try_new` carried the same two checks — reject an empty explicit set and reject IDs that aren't all of the right kind — differing only in the component kind and the noun used in error messages. Extract them into `validate_pool_ids`, parameterised by the matching-ID set and a noun, and call it from both pools. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`power_bounds` and `telemetry_snapshots` on both `BatteryPool` and `PvPool` repeated the same weak-sender reuse gate — upgrade the stored `WeakSender`, keep it only if it still has receivers, and subscribe — in four places. Pull it into a single `try_reuse` helper so the reuse policy lives in one spot. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The Battery and PV pool implementations carried large amounts of byte-for-byte
duplicated logic. This branch factors the shared parts into single
implementations, with no intended behavior change.
Changes
try_reusefor the broadcast weak-sender reuse gate thatpower_bounds/telemetry_snapshotsrepeated on both pools.PvPoolBoundsTracker/BatteryPoolBoundsTrackerrunloops with one generic
PoolBoundsTracker<S, Q>taking the per-poolaggregation as a
fn(&S) -> Vec<Bounds<Q>>.ComponentHealthPartitionand rebuildPvPoolSnapshotandInverterBatteryGroupStatuson it. Public API: new crate-root export;the two snapshot types' fields are reshaped to carry it.
validate_pool_idsfor the shared component-ID checks.test_supportmodule.